這裡蒐索程式師資訊,查找有用的技術資料
當前位置:首頁 » 編程語言 » sparksql數組
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sparksql數組

發布時間: 2022-04-20 05:38:57

① 如何使用sparksql向mysql中插入數據

f(isset($_POST['submit'])&&$_POST['submit']=='提交'){
3 //判斷是否是提交過來的
4 $intext = $_POST['intext'];
5 if($intext!=null||$intext!=''){
6 $link = mysql_connect("localhost", "root", "123456");
7 //資料庫配置信息 第一個參數資料庫位置第二個是用戶名第三個是密碼
8 mysql_select_db("szn_test");
9 //設置要使用的資料庫
10 $sql = "select * from demo where res = '".$intext."'";

② sparksql緩存表能做廣播變數嗎

共享變數
通常情況下,當向Spark操作(如map,rece)傳遞一個函數時,它會在一個遠程集群節點上執行,它會使用函數中所有變數的副本。這些變數被復制到所有的機器上,遠程機器上並沒有被更新的變數會向驅動程序回傳。在任務之間使用通用的,支持讀寫的共享變數是低效的。盡管如此,Spark提供了兩種有限類型的共享變數,廣播變數和累加器。

廣播變數
廣播變數允許程序員將一個只讀的變數緩存在每台機器上,而不用在任務之間傳遞變數。廣播變數可被用於有效地給每個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播演算法來分發變數,進而減少通信的開銷。
Spark的動作通過一系列的步驟執行,這些步驟由分布式的洗牌操作分開。Spark自動地廣播每個步驟每個任務需要的通用數據。這些廣播數據被序列化地緩存,在運行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地創建廣播變數才有用。

通過在一個變數v上調用SparkContext.broadcast(v)可以創建廣播變數。廣播變數是圍繞著v的封裝,可以通過value方法訪問這個變數。舉例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在創建了廣播變數之後,在集群上的所有函數中應該使用它來替代使用v.這樣v就不會不止一次地在節點之間傳輸了。另外,為了確保所有的節點獲得相同的變數,對象v在被廣播之後就不應該再修改。

累加器
累加器是僅僅被相關操作累加的變數,因此可以在並行中被有效地支持。它可以被用來實現計數器和總和。Spark原生地只支持數字類型的累加器,編程者可以添加新類型的支持。如果創建累加器時指定了名字,可以在Spark的UI界面看到。這有利於理解每個執行階段的進程。(對於python還不支持)
累加器通過對一個初始化了的變數v調用SparkContext.accumulator(v)來創建。在集群上運行的任務可以通過add或者"+="方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程序能夠讀取它的值,通過累加器的value方法。
下面的代碼展示了如何把一個數組中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

盡管上面的例子使用了內置支持的累加器類型Int,但是開發人員也可以通過繼承AccumulatorParam類來創建它們自己的累加器類型。AccumulatorParam介面有兩個方法:
zero方法為你的類型提供一個0值。
addInPlace方法將兩個值相加。
假設我們有一個代表數學vector的Vector類。我們可以向下面這樣實現:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加介面來累加數據,盡管結果的類型和累加的數據類型可能不一致(例如,通過收集在一起的元素來創建一個列表)。同時,SparkContext..accumulableCollection方法來累加通用的Scala的集合類型。

累加器僅僅在動作操作內部被更新,Spark保證每個任務在累加器上的更新操作只被執行一次,也就是說,重啟任務也不會更新。在轉換操作中,用戶必須意識到每個任務對累加器的更新操作可能被不只一次執行,如果重新執行了任務和作業的階段。
累加器並沒有改變Spark的惰性求值模型。如果它們被RDD上的操作更新,它們的值只有當RDD因為動作操作被計算時才被更新。因此,當執行一個惰性的轉換操作,比如map時,不能保證對累加器值的更新被實際執行了。下面的代碼片段演示了此特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在這里,accum的值仍然是0,因為沒有動作操作引起map被實際的計算.

③ spark 怎麼通過寫sql語句一行一行讀數據

spark 怎麼通過寫sql語句一行一行讀數據
Spark SQL就是shark ,也就是SQL on Spark。如果沒記錯的話,shark的開發利用了hive的API,所以支持讀取HBase。而且Spark的數據類型兼容範圍大於Hadoop,並且包含了Hadoop所支持的任何數據類型。

④ spark sql怎麼去獲取hive 表一定日期范圍內的數據

select orderid,fenjian,timee
from
(
select orderid,fenjian,timee,row_number(orderid,fenjian) rn
from (
select orderid,fenjian,timee from tableName
distribute by orderid,fenjian sort by orderid,fenjian,timee asc
) t1
) t2
where t2.rn=1

⑤ spark sql 列怎麼轉換數據類型

Java獲取資料庫的表中各欄位的欄位名,代碼如下:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
public class TestDemo {
public static Connection getConnection() {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
String url = "jdbc:mysql://資料庫IP地址:3306/資料庫名稱";
String user = "資料庫用戶名";
String pass = "資料庫用戶密碼";
conn = DriverManager.getConnection(url, user, pass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
public static void main(String[] args) {
Connection conn = getConnection();
String sql = "select * from AccessType";
PreparedStatement stmt;
try {
stmt = conn.prepareStatement(sql);
ResultSet rs = stmt.executeQuery(sql);
ResultSetMetaData data = rs.getMetaData();
for (int i = 1; i <= data.getColumnCount(); i++) {
// 獲得所有列的數目及實際列數
int columnCount = data.getColumnCount();
// 獲得指定列的列名
String columnName = data.getColumnName(i);
// 獲得指定列的列值
int columnType = data.getColumnType(i);
// 獲得指定列的數據類型名
String columnTypeName = data.getColumnTypeName(i);
// 所在的Catalog名字
String catalogName = data.getCatalogName(i);
// 對應數據類型的類
String columnClassName = data.getColumnClassName(i);
// 在資料庫中類型的最大字元個數
int columnDisplaySize = data.getColumnDisplaySize(i);
// 默認的列的標題
String columnLabel = data.getColumnLabel(i);
// 獲得列的模式
String schemaName = data.getSchemaName(i);
// 某列類型的精確度(類型的長度)
int precision = data.getPrecision(i);
// 小數點後的位數
int scale = data.getScale(i);
// 獲取某列對應的表名
String tableName = data.getTableName(i);
// 是否自動遞增
boolean isAutoInctement = data.isAutoIncrement(i);
// 在資料庫中是否為貨幣型
boolean isCurrency = data.isCurrency(i);
// 是否為空
int isNullable = data.isNullable(i);
// 是否為只讀
boolean isReadOnly = data.isReadOnly(i);
// 能否出現在where中
boolean isSearchable = data.isSearchable(i);
System.out.println(columnCount);
System.out.println("獲得列" + i + "的欄位名稱:" + columnName);
System.out.println("獲得列" + i + "的類型,返回SqlType中的編號:"+ columnType);
System.out.println("獲得列" + i + "的數據類型名:" + columnTypeName);
System.out.println("獲得列" + i + "所在的Catalog名字:"+ catalogName);
System.out.println("獲得列" + i + "對應數據類型的類:"+ columnClassName);
System.out.println("獲得列" + i + "在資料庫中類型的最大字元個數:"+ columnDisplaySize);
System.out.println("獲得列" + i + "的默認的列的標題:" + columnLabel);
System.out.println("獲得列" + i + "的模式:" + schemaName);
System.out.println("獲得列" + i + "類型的精確度(類型的長度):" + precision);
System.out.println("獲得列" + i + "小數點後的位數:" + scale);
System.out.println("獲得列" + i + "對應的表名:" + tableName);
System.out.println("獲得列" + i + "是否自動遞增:" + isAutoInctement);
System.out.println("獲得列" + i + "在資料庫中是否為貨幣型:" + isCurrency);
System.out.println("獲得列" + i + "是否為空:" + isNullable);
System.out.println("獲得列" + i + "是否為只讀:" + isReadOnly);
System.out.println("獲得列" + i + "能否出現在where中:"+ isSearchable);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}

⑥ spark sql dataset怎麼做分組排序呢

sparksql怎樣取分組後的topn
Spark SQL 開窗函數
1、Spark 1.5.x版本以後,在Spark SQL和DataFrame中引入了開窗函數,比如最經典的就是我們的row_number(),可以讓我們實現分組取topn的邏輯。
2、做一個案例進行topn的取值(利用Spark的開窗函數),不知道是否還有印象,我們之前在最早的時候,做過topn的計算,當時是非常麻煩的。但是現在用了Spark SQL之後,非常方便。

⑦ sparkSQL和spark有什麼區別

Spark為結構化數據處理引入了一個稱為Spark SQL的編程模塊。簡而言之,sparkSQL是Spark的前身,是在Hadoop發展過程中,為了給熟悉RDBMS但又不理解MapRece的技術人員提供快速上手的工具。
sparkSQL提供了一個稱為DataFrame(數據框)的編程抽象,DF的底層仍然是RDD,並且可以充當分布式SQL查詢引擎。

SparkSql有哪些特點呢?

1)引入了新的RDD類型SchemaRDD,可以像傳統資料庫定義表一樣來定義SchemaRDD。

2)在應用程序中可以混合使用不同來源的數據,如可以將來自HiveQL的數據和來自SQL的數據進行Join操作。

3)內嵌了查詢優化框架,在把SQL解析成邏輯執行計劃之後,最後變成RDD的計算。

⑧ spark sql擁有哪些特點

向下兼容各種數據源,統一的編程介面,功能倆字強大,用起來一個字香

⑨ 如何使用 Spark SQL

一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項

--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL

-f <filename> 以文件方式批量執行SQL

二、Spark sql對hive支持的功能

1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF

5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()

分組統計:df.groupBy("age").count().show()

1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)

四、Spark sql性能調優

緩存數據表:sqlContext.cacheTable("tableName")

取消緩存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險